// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Search is a sample web server that uses Cloud Bigtable as the storage layer
// for a simple document-storage and full-text-search service.
// It has four functions:
// - Initialize and clear the table.
// - Add a document.  This adds the content of a user-supplied document to the
//   Bigtable, and adds references to the document to an index in the Bigtable.
//   The document is indexed under each unique word in the document.
// - Search the index.  This returns documents containing each word in a user
//   query, with snippets and links to view the whole document.
// - Copy table.  This copies the documents and index from another table and
//   adds them to the current one.
package main

import (
	"bytes"
	"context"
	"flag"
	"fmt"
	"html/template"
	"io"
	"log"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"time"
	"unicode"

	"cloud.google.com/go/bigtable"
)

var (
	addTemplate = template.Must(template.New("").Parse(`<html><body>
Added {{.Title}}
</body></html>`))

	contentTemplate = template.Must(template.New("").Parse(`<html><body>
<b>{{.Title}}</b><br><br>
{{.Content}}
</body></html>`))

	searchTemplate = template.Must(template.New("").Parse(`<html><body>
Results for <b>{{.Query}}</b>:<br><br>
{{range .Results}}
<a href="/content?name={{.Title}}">{{.Title}}</a><br>
<i>{{.Snippet}}</i><br><br>
{{end}}
</body></html>`))
)

const (
	indexColumnFamily   = "i"
	contentColumnFamily = "c"
	mainPage            = `
	<html>
		<head>
			<title>Document Search</title>
		</head>
		<body>
			Initialize and clear table:
			<form action="/reset" method="post">
				<div><input type="submit" value="Init"></div>
			</form>

			Search for documents:
			<form action="/search" method="post">
				<div><input type="text" name="q" size=80></div>
				<div><input type="submit" value="Search"></div>
			</form>

			Add a document:
			<form action="/add" method="post">
				Document name:
				<div><textarea name="name" rows="1" cols="80"></textarea></div>
				Document text:
				<div><textarea name="content" rows="20" cols="80"></textarea></div>
				<div><input type="submit" value="Submit"></div>
			</form>

			Copy data from another table:
			<form action="/copy" method="post">
				Source table name:
				<div><input type="text" name="name" size=80></div>
				<div><input type="submit" value="Copy"></div>
			</form>
		</body>
	</html>
	`
)

func main() {
	var (
		project   = flag.String("project", "", "The name of the project.")
		instance  = flag.String("instance", "", "The name of the Cloud Bigtable instance.")
		tableName = flag.String("table", "docindex", "The name of the table containing the documents and index.")
		port      = flag.Int("port", 8080, "TCP port for server.")
	)
	flag.Parse()

	// Make an admin client.
	adminClient, err := bigtable.NewAdminClient(context.Background(), *project, *instance)
	if err != nil {
		log.Fatal("Bigtable NewAdminClient:", err)
	}

	// Make a regular client.
	client, err := bigtable.NewClient(context.Background(), *project, *instance)
	if err != nil {
		log.Fatal("Bigtable NewClient:", err)
	}

	// Open the table.
	table := client.Open(*tableName)

	// Set up HTML handlers, and start the web server.
	http.HandleFunc("/search", func(w http.ResponseWriter, r *http.Request) { handleSearch(w, r, table) })
	http.HandleFunc("/content", func(w http.ResponseWriter, r *http.Request) { handleContent(w, r, table) })
	http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) { handleAddDoc(w, r, table) })
	http.HandleFunc("/reset", func(w http.ResponseWriter, r *http.Request) { handleReset(w, r, *tableName, adminClient) })
	http.HandleFunc("/copy", func(w http.ResponseWriter, r *http.Request) { handleCopy(w, r, *tableName, client, adminClient) })
	http.HandleFunc("/", handleMain)
	if err := http.ListenAndServe(":"+strconv.Itoa(*port), nil); err != nil {
		log.Fatal(err)
	}
}

// handleMain outputs the home page.
func handleMain(w http.ResponseWriter, r *http.Request) {
	io.WriteString(w, mainPage)
}

// tokenize splits a string into tokens.
// This is very simple, it's not a good tokenization function.
func tokenize(s string) []string {
	wordMap := make(map[string]bool)
	f := strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) })
	for _, word := range f {
		word = strings.ToLower(word)
		wordMap[word] = true
	}
	words := make([]string, 0, len(wordMap))
	for word := range wordMap {
		words = append(words, word)
	}
	return words
}

// handleContent fetches the content of a document from the Bigtable and returns it.
func handleContent(w http.ResponseWriter, r *http.Request, table *bigtable.Table) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	name := r.FormValue("name")
	if len(name) == 0 {
		http.Error(w, "No document name supplied.", http.StatusBadRequest)
		return
	}

	row, err := table.ReadRow(ctx, name)
	if err != nil {
		http.Error(w, "Error reading content: "+err.Error(), http.StatusInternalServerError)
		return
	}
	content := row[contentColumnFamily]
	if len(content) == 0 {
		http.Error(w, "Document not found.", http.StatusNotFound)
		return
	}
	var buf bytes.Buffer
	if err := contentTemplate.ExecuteTemplate(&buf, "", struct{ Title, Content string }{name, string(content[0].Value)}); err != nil {
		http.Error(w, "Error executing HTML template: "+err.Error(), http.StatusInternalServerError)
		return
	}
	io.Copy(w, &buf)
}

// handleSearch responds to search queries, returning links and snippets for matching documents.
func handleSearch(w http.ResponseWriter, r *http.Request, table *bigtable.Table) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	query := r.FormValue("q")
	// Split the query into words.
	words := tokenize(query)
	if len(words) == 0 {
		http.Error(w, "Empty query.", http.StatusBadRequest)
		return
	}

	// readRows reads from many rows concurrently.
	readRows := func(rows []string) ([]bigtable.Row, error) {
		results := make([]bigtable.Row, len(rows))
		errors := make([]error, len(rows))
		var wg sync.WaitGroup
		for i, row := range rows {
			wg.Add(1)
			go func(i int, row string) {
				defer wg.Done()
				results[i], errors[i] = table.ReadRow(ctx, row, bigtable.RowFilter(bigtable.LatestNFilter(1)))
			}(i, row)
		}
		wg.Wait()
		for _, err := range errors {
			if err != nil {
				return nil, err
			}
		}
		return results, nil
	}

	// For each query word, get the list of documents containing it.
	results, err := readRows(words)
	if err != nil {
		http.Error(w, "Error reading index: "+err.Error(), http.StatusInternalServerError)
		return
	}

	// Count how many of the query words each result contained.
	hits := make(map[string]int)
	for _, r := range results {
		for _, r := range r[indexColumnFamily] {
			hits[r.Column]++
		}
	}

	// Build a slice of all the documents that matched every query word.
	var matches []string
	for doc, count := range hits {
		if count == len(words) {
			matches = append(matches, doc[len(indexColumnFamily+":"):])
		}
	}

	// Fetch the content of those documents from the Bigtable.
	content, err := readRows(matches)
	if err != nil {
		http.Error(w, "Error reading results: "+err.Error(), http.StatusInternalServerError)
		return
	}

	type result struct{ Title, Snippet string }
	data := struct {
		Query   string
		Results []result
	}{query, nil}

	// Output links and snippets.
	for i, doc := range matches {
		var text string
		c := content[i][contentColumnFamily]
		if len(c) > 0 {
			text = string(c[0].Value)
		}
		if len(text) > 100 {
			text = text[:100] + "..."
		}
		data.Results = append(data.Results, result{doc, text})
	}
	var buf bytes.Buffer
	if err := searchTemplate.ExecuteTemplate(&buf, "", data); err != nil {
		http.Error(w, "Error executing HTML template: "+err.Error(), http.StatusInternalServerError)
		return
	}
	io.Copy(w, &buf)
}

// handleAddDoc adds a document to the index.
func handleAddDoc(w http.ResponseWriter, r *http.Request, table *bigtable.Table) {
	if r.Method != "POST" {
		http.Error(w, "POST requests only", http.StatusMethodNotAllowed)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	name := r.FormValue("name")
	if len(name) == 0 {
		http.Error(w, "Empty document name!", http.StatusBadRequest)
		return
	}

	content := r.FormValue("content")
	if len(content) == 0 {
		http.Error(w, "Empty document content!", http.StatusBadRequest)
		return
	}

	var (
		writeErr error          // Set if any write fails.
		mu       sync.Mutex     // Protects writeErr
		wg       sync.WaitGroup // Used to wait for all writes to finish.
	)

	// writeOneColumn writes one column in one row, updates err if there is an error,
	// and signals wg that one operation has finished.
	writeOneColumn := func(row, family, column, value string, ts bigtable.Timestamp) {
		mut := bigtable.NewMutation()
		mut.Set(family, column, ts, []byte(value))
		err := table.Apply(ctx, row, mut)
		if err != nil {
			mu.Lock()
			writeErr = err
			mu.Unlock()
		}
	}

	// Start a write to store the document content.
	wg.Add(1)
	go func() {
		writeOneColumn(name, contentColumnFamily, "", content, bigtable.Now())
		wg.Done()
	}()

	// Start writes to store the document name in the index for each word in the document.
	words := tokenize(content)
	for _, word := range words {
		var (
			row    = word
			family = indexColumnFamily
			column = name
			value  = ""
			ts     = bigtable.Now()
		)
		wg.Add(1)
		go func() {
			// TODO: should use a semaphore to limit the number of concurrent writes.
			writeOneColumn(row, family, column, value, ts)
			wg.Done()
		}()
	}
	wg.Wait()
	if writeErr != nil {
		http.Error(w, "Error writing to Bigtable: "+writeErr.Error(), http.StatusInternalServerError)
		return
	}
	var buf bytes.Buffer
	if err := addTemplate.ExecuteTemplate(&buf, "", struct{ Title string }{name}); err != nil {
		http.Error(w, "Error executing HTML template: "+err.Error(), http.StatusInternalServerError)
		return
	}
	io.Copy(w, &buf)
}

// handleReset deletes the table if it exists, creates it again, and creates its column families.
func handleReset(w http.ResponseWriter, r *http.Request, table string, adminClient *bigtable.AdminClient) {
	if r.Method != "POST" {
		http.Error(w, "POST requests only", http.StatusMethodNotAllowed)
		return
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()

	adminClient.DeleteTable(ctx, table)
	if err := adminClient.CreateTable(ctx, table); err != nil {
		http.Error(w, "Error creating Bigtable: "+err.Error(), http.StatusInternalServerError)
		return
	}
	time.Sleep(20 * time.Second)
	// Create two column families, and set the GC policy for each one to keep one version.
	for _, family := range []string{indexColumnFamily, contentColumnFamily} {
		if err := adminClient.CreateColumnFamily(ctx, table, family); err != nil {
			http.Error(w, "Error creating column family: "+err.Error(), http.StatusInternalServerError)
			return
		}
		if err := adminClient.SetGCPolicy(ctx, table, family, bigtable.MaxVersionsPolicy(1)); err != nil {
			http.Error(w, "Error setting GC policy: "+err.Error(), http.StatusInternalServerError)
			return
		}
	}
	w.Write([]byte("<html><body>Done.</body></html>"))
	return
}

// copyTable copies data from one table to another.
func copyTable(src, dst string, client *bigtable.Client, adminClient *bigtable.AdminClient) error {
	if src == "" || src == dst {
		return nil
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	// Open the source and destination tables.
	srcTable := client.Open(src)
	dstTable := client.Open(dst)

	var (
		writeErr error          // Set if any write fails.
		mu       sync.Mutex     // Protects writeErr
		wg       sync.WaitGroup // Used to wait for all writes to finish.
	)
	copyRowToTable := func(row bigtable.Row) bool {
		mu.Lock()
		failed := writeErr != nil
		mu.Unlock()
		if failed {
			return false
		}
		mut := bigtable.NewMutation()
		for family, items := range row {
			for _, item := range items {
				// Get the column name, excluding the column family name and ':' character.
				columnWithoutFamily := item.Column[len(family)+1:]
				mut.Set(family, columnWithoutFamily, bigtable.Now(), item.Value)
			}
		}
		wg.Add(1)
		go func() {
			// TODO: should use a semaphore to limit the number of concurrent writes.
			if err := dstTable.Apply(ctx, row.Key(), mut); err != nil {
				mu.Lock()
				writeErr = err
				mu.Unlock()
			}
			wg.Done()
		}()
		return true
	}

	// Create a filter that only accepts the column families we're interested in.
	filter := bigtable.FamilyFilter(indexColumnFamily + "|" + contentColumnFamily)
	// Read every row from srcTable, and call copyRowToTable to copy it to our table.
	err := srcTable.ReadRows(ctx, bigtable.InfiniteRange(""), copyRowToTable, bigtable.RowFilter(filter))
	wg.Wait()
	if err != nil {
		return err
	}
	return writeErr
}

// handleCopy copies data from one table to another.
func handleCopy(w http.ResponseWriter, r *http.Request, dst string, client *bigtable.Client, adminClient *bigtable.AdminClient) {
	if r.Method != "POST" {
		http.Error(w, "POST requests only", http.StatusMethodNotAllowed)
		return
	}
	src := r.FormValue("name")
	if src == "" {
		http.Error(w, "No source table specified.", http.StatusBadRequest)
		return
	}
	if err := copyTable(src, dst, client, adminClient); err != nil {
		http.Error(w, "Failed to rebuild index: "+err.Error(), http.StatusInternalServerError)
		return
	}
	fmt.Fprint(w, "Copied table.\n")
}
